<p>
  In this trading strategy we would define a class named 'pairs'. We manage pairs instead of stocks directly to make it's more convenient for us to calculate correlation and cointegration, update stock prices in the pair and trade on the selected pairs.
</p>

<h3>Step 1: Pairs Class Definition</h3>
<p>
  The pairs is made up of two stocks, stock A and stock B. This class has several properties. The basic properties include symbols of stock A and stock B, the pandas DataFrame that contains time and prices of the two stocks, the current error, the error of the last datapoint, and the lists to record stock prices for update purpose. Instead of updating the DataFrame every 5 minutes, we record the prices in lists to update the DataFrame monthly. This would speed up the algorithm at least 10 times because manipulating DataFrame is very time consuming. The cor_update method is used every month to update the correlation between the two stocks in this pair. The cointegration_test method is also used monthly to do OLS regression, conduct ADF test, and calculate the mean and standard deviation of the residual. The method also assign these calculated values as properties to the pair object.
</p>

<div class="section-example-container">

<pre class="python">class pairs(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
        self.name = str(a) + ':' + str(b)
        self.df = pd.concat([a.df,b.df],axis = 1).dropna()
    # The number of bars in the rolling window would be determined by the resolution, so we get this
      information from the shape of the DataFrame here.
        self.num_bar = self.df.shape[0]
        self.cor = self.df.corr().ix[0][1]
    # Set the initial signals to be 0
        self.error = 0
        self.last_error = 0
        self.a_price = []
        self.a_date = []
        self.b_price = []
        self.b_date = []

    def cor_update(self):
        self.cor = self.df.corr().ix[0][1]

    def cointegration_test(self):
        self.model = sm.ols(formula = '%s ~ %s'%(str(self.a),str(self.b)), data = self.df).fit()
    # This line conduct ADF test on the residual. ts.adfuller() returns a tuple and the first element in
      the tuple is the test value.
        self.adf = ts.adfuller(self.model.resid,autolag = 'BIC')[0]
        self.mean_error = np.mean(self.model.resid)
        self.sd = np.std(self.model.resid)

    def price_record(self,data_a,data_b):
        self.a_price.append(float(data_a.Close))
        self.a_date.append(data_a.EndTime)
        self.b_price.append(float(data_b.Close))
        self.b_date.append(data_b.EndTime)

    def df_update(self):
        new_df = pd.DataFrame({str(self.a):self.a_price,str(self.b):self.b_price},index =
                 [self.a_date]).dropna()
        self.df = pd.concat([self.df,new_df])
        self.df = self.df.tail(self.num_bar)
    # after updating the DataFrame, we empty the lists for the incoming data
        for list in [self.a_price,self.a_date,self.b_price,self.b_date]:
            list = []
</pre>
</div>
<h3>Step 2: Generate and Clean Pairs</h3>
<p>
  The function generate_pairs generates pairs using the stock symbols. self.pair_threshold and self.pair_num are pre-determined to control the number of candidate pairs. The pairs in self.pair_list would be kept and updated throughout our backtesting period. we set self.pair_threshold to 0.88 and self.pair_num to 120 to limit the number of pairs in the list. If we put too many pairs in the list, the backtesting would be too time consuming.
  The function pair_clean is called after the two-stage screen. If the first pair contains stock A and stock B, and the second pair contains stock B and stock C, we would remove the second pair because the overlapped signal would disturb the balance of our portfolio.
</p>

<div class="section-example-container">

<pre class="python">def generate_pairs(self):
    for i in range(len(self.symbols)):
        for j in range(i+1,len(self.symbols)):
            self.pair_list.append(pairs(self.symbols[i],self.symbols[j]))

    self.pair_list = [x for x in self.pair_list if x.cor &gt; self.pair_threshold]

    self.pair_list.sort(key = lambda x: x.cor, reverse = True)

    if len(self.pair_list) &gt; self.pair_num:
        	self.pair_list = self.pair_list[:self.pair_num]

def pair_clean(self,list):
    l = []
    l.append(list[0])
    for i in list:
        symbols = [x.a for x in l] + [x.b for x in l]
        if i.a not in symbols and i.b not in symbols:
            l.append(i)
        else:
            pass
    return l
</pre>
</div>

<h3>Step 3: Warming up Period</h3>
<p>
  This part is under the OnData step. We set self.num_bar equals to the number of TradeBar in three months, which is determined by the resolution. During this period we fill the stock prices in lists, and assign each stock's price list to the symbol as a property. We would also remove the symbol from the symbol list if it has no data.
</p>

<div class="section-example-container">

<pre class="python">if len(self.symbols[0].prices) &lt; self.num_bar:
    for symbol in self.symbols:
        if data.ContainsKey(i) is True:
    	    symbol.prices.append(float(data[symbol].Close))
            symbol.dates.append(data[symbol].EndTime)
        else:
            self.Log('%s is missing'%str(symbol))
            self.symbols.remove(symbol)
    self.data_count = 0
    return</pre>
</div>
<h3>Step 4: Pairs Selection</h3>
<p>
  This process is also under the OnData step. This step would generate pairs if it is the first trading period of this algorithm. If it's not, it will update the DataFrame and correlation coefficient of each pair in self.pair_list. After that the pairs have a correlation coefficient higher than 0.9 would be selected into self.selected_pair. Then all the pairs in self.selected_pair would be tested on their cointegration, and the pairs with a test value less than -3.34 would be selected to the final list. This step will also limit the number of stocks in the final list, by default we set self.selected_num to 10. self.count is a flag to count the number of datapoint we received. Once it reach 1-month amount, that means one trading period is passed and it would be set to 0.
</p>

<div class="section-example-container">

<pre class="python">if self.count == 0 and len(self.symbols[0].prices) == self.num_bar:
    if self.generate_count == 0:
        for symbol in self.symbols:
        symbol.df = pd.DataFrame(symbol.prices, index = symbol.dates, columns = ['%s'%str(symbol)])

        self.generate_pairs()
        self.generate_count +=1
        self.Log('pair list length:'+str(len(self.pair_list)))

        for pair in self.pair_list:
            pair.cor_update()
    # Update the DataFrame and correlation selection
    if len(self.pair_list[0].a_price) != 0:
        for pair in self.pair_list:
    	    pair.df_update()
            pair.cor_update()

    self.selected_pair = [x for x in self.pair_list if x.cor &gt; 0.9]
    # Cointegration test
    for pair in self.selected_pair:
        pair.cointegration_test()

    self.selected_pair = [x for x in self.selected_pair if x.adf &lt; self.BIC]
    self.selected_pair.sort(key = lambda x: x.adf)
    # If no pair passed the two-stage test, return.
    if len(self.selected_pair) == 0:
        self.Log('no selected pair')
        self.count += 1
        return
    # clean the pair to avoid overlapping stocks.
    self.selected_pair = self.pair_clean(self.selected_pair)
    # assign a property to the selected pair, this is a signal that would be used for trading.
    for pair in self.selected_pair:
        pair.touch = 0
        self.Log(str(pair.adf) + pair.name)
    # limit the number of selected pairs.
    if len(self.selected_pair) &gt; self.selected_num:
        self.selected_pair = self.selected_pair[:self.selected_num]

    self.count +=1
    self.data_count = 0
    return
</pre>
</div>

<h3>Step 5: Trade Period</h3>
<p>
  It would be too long to read if we paste all the code in trading period together. Thus we would separate the code into three part: updating pairs, opening pairs trading and closing pairs trading. But all those lines are under OnData step and are under the condition: if self.count != 0 and self.count &lt; self.one_month. This means it's in the trading period.
</p>

<h4>Updating Pairs</h4>
<p>
  This step would update the stock prices in each pair. It would also update the signal called 'last_error' and immediately after this the pairs would receive new signals.
</p>

<div class="section-example-container">

<pre class="python">num_select = len(self.selected_pair)
for pair in self.pair_list:
    if data.ContainsKey(pair.a) is True and data.ContainsKey(pair.b) is True:
        i.price_record(data[i.a],data[i.b])
    else:
        self.Log('%s has no data'%str(pair.name))
        self.pair_list.remove(pair)

for pair in self.selected_pair:
    pair.last_error = pair.error

for pair in self.trading_pairs:
    pair.last_error = pair.error</pre>
</div>
<h4>Opening Pairs Trading</h4>
<p>
  For each pair in self.selected_pair, we receive the current prices of the stocks, and then use the cointegration model to calculate the residual \(\epsilon\), which is assigned to the pair as a property named 'error'. self.trading.pairs is a list to store the trading pairs. Once a pairs trading is open, this pair would be add to the list, and it would be removed when the trading is closed. The property 'touch' is signal. If the residual \(\epsilon\) cross over the positive threshold standard deviation(we set \(\ 2.23*sigma\) here), the signal would become +1; while if it cross down the negative threshold deviation(\(\ -2.23*sigma\), the signal would become -1. For those pairs with +1 signal, if the error cross down positive threshold, there is a signal to open a trade. We long stock B and short stock A. For those pairs with -1 signal, if the error cross over negative threshold, we long Stock A and short stock B.
  When we opening a trade, we need to record the current model, current mean and standard deviation of the residual. This is necessary because if we enter a new trading period and the trade has not been closed yet, the cointegration model, mean and standard deviation of the pairs would be changed. We need to use the original thresholds to close the trades. while adding the pairs into self.trading_pairs, we also need to set the signal 'touch' to 0 for further use.
</p>

<div class="section-example-container">

<pre class="python">for i in self.selected_pair:
    price_a = float(data[i.a].Close)
    price_b = float(data[i.b].Close)
    i.error = price_a - (i.model.params[0] + i.model.params[1]*price_b)
    if (self.Portfolio[i.a].Quantity == 0 and self.Portfolio[i.b].Quantity == 0) and i not in
    self.trading_pairs:
        if i.touch == 0:
            if i.error &lt; i.mean_error - self.open_size*i.sd and i.last_error &gt; i.mean_error -
            self.open_size*i.sd:
                i.touch += -1
            elif i.error &gt; i.mean_error + self.open_size*i.sd and i.last_error &lt; i.mean_error + self.open_size*i.sd: i.touch += 1 else: pass elif i.touch == -1: if i.error &gt; i.mean_error - self.open_size*i.sd and i.last_error &lt; i.mean_error -
            self.open_size*i.sd:
                self.Log('long %s and short %s'%(str(i.a),str(i.b)))
                i.record_model = i.model
                i.record_mean_error = i.mean_error
                i.record_sd = i.sd
                self.trading_pairs.append(i)
                self.SetHoldings(i.a, 5.0/(len(self.selected_pair)))
                self.SetHoldings(i.b, -5.0/(len(self.selected_pair)))
                i.touch = 0
         elif i.touch == 1:
             if i.error &lt; i.mean_error + self.open_size*i.sd and i.last_error &gt; i.mean_error +
             self.open_size*i.sd:
             self.Log('long %s and short %s'%(str(i.b),str(i.a)))
             i.record_model = i.model
             i.record_mean_error = i.mean_error
             i.record_sd = i.sd
             self.trading_pairs.append(i)
             self.SetHoldings(i.b, 5.0/(len(self.selected_pair)))
             self.SetHoldings(i.a, -5.0/(len(self.selected_pair)))
             i.touch = 0
         else:
             pass
    else:
        pass
</pre>
</div>

<h4>Closing Pairs Trading</h4>
<p>
  This part controls pairs trading exit. It works similar to the opening part. It uses the recorded original model and thresholds to determine whether or not we should close the position. If the residual \(\epsilon\) reaches our closing threshold, we liquidate stock A and stock B to close. If the residual continue to deviate from the mean and goes too far, we would also close the position to stop loss. When we close a pairs trading, we also remove the pairs from self.trading_pairs.
</p>

<div class="section-example-container">

<pre class="python">for i in self.trading_pairs:
    price_a = float(data[i.a].Close)
    price_b = float(data[i.b].Close)
    i.error = price_a - (i.record_model.params[0] + i.record_model.params[1]*price_b)
    if ((i.error &lt; i.record_mean_error + self.close_size*i.record_sd and i.last_error &gt;i.record_mean_error + self.close_size*i.record_sd) or (i.error &gt; i.record_mean_error -
    self.close_size*i.record_sd and i.last_error  i.record_mean_error +
    self.stop_loss*i.record_sd:
        self.Log('close %s to stop loss'%str(i.name))
        self.Liquidate(i.a)
        self.Liquidate(i.b)
        self.trading_pairs.remove(i)
    else:
        pass</pre>
</div>
